{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Kubeflow Pipelines"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The [Kubeflow Pipelines SDK](https://github.com/kubeflow/pipelines/tree/master/sdk) provides a set of Python packages that you can use to specify and run your machine learning (ML) workflows. A pipeline is a description of an ML workflow, including all of the components that make up the steps in the workflow and how the components interact with each other.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Kubeflow website has a very detail expaination of kubeflow components, please go to [Introduction to the Pipelines SDK](https://www.kubeflow.org/docs/pipelines/sdk/sdk-overview/) for details"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Install the Kubeflow Pipelines SDK"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This guide tells you how to install the [Kubeflow Pipelines SDK](https://github.com/kubeflow/pipelines/tree/master/sdk) which you can use to build machine learning pipelines. You can use the SDK to execute your pipeline, or alternatively you can upload the pipeline to the Kubeflow Pipelines UI for execution.\n",
    "\n",
    "All of the SDK’s classes and methods are described in the auto-generated [SDK reference docs](https://kubeflow-pipelines.readthedocs.io/en/latest/).\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Run the following command to install the Kubeflow Pipelines SDK\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!PYTHONWARNINGS=ignore::yaml.YAMLLoadWarning"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install https://storage.googleapis.com/ml-pipeline/release/0.1.29/kfp.tar.gz --upgrade --user"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Restart the kernel to pick up pip installed libraries\n",
    "from IPython.core.display import HTML\n",
    "\n",
    "HTML(\"<script>Jupyter.notebook.kernel.restart()</script>\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Build a Simple Pipeline"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In this example, we want to calculate sum of three numbers. \n",
    "\n",
    "1. Let's assume we have a python image to use. It accepts two arguments and return sum of them. \n",
    "\n",
    "2. The sum of a and b will be used to calculate final result with sum of c and d. In total, we will have three arithmetical operators. Then we use another echo operator to print the result. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create a Container Image for Each Component\n",
    "\n",
    "Assumes that you have already created a program to perform the task required in a particular step of your ML workflow. For example, if the task is to train an ML model, then you must have a program that does the training,\n",
    "\n",
    "Your component can create `outputs` that the downstream components can use as `inputs`. This will be used to build Job Directed Acyclic Graph (DAG)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "> In this case, we will use a python base image to do the calculation. We skip buiding our own image."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create a Python Function to Wrap Your Component\n",
    "\n",
    "Define a Python function to describe the interactions with the Docker container image that contains your pipeline component.\n",
    "\n",
    "Here, in order to simplify the process, we use simple way to calculate sum. Ideally, you need to build a new container image for your code change."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp\n",
    "from kfp import dsl\n",
    "\n",
    "\n",
    "def add_two_numbers(a, b):\n",
    "    return dsl.ContainerOp(\n",
    "        name=\"calculate_sum\",\n",
    "        image=\"python:3.6.8\",\n",
    "        command=[\"python\", \"-c\"],\n",
    "        arguments=['with open(\"/tmp/results.txt\", \"a\") as file: file.write(str({} + {}))'.format(a, b)],\n",
    "        file_outputs={\n",
    "            \"data\": \"/tmp/results.txt\",\n",
    "        },\n",
    "    )\n",
    "\n",
    "\n",
    "def echo_op(text):\n",
    "    return dsl.ContainerOp(\n",
    "        name=\"echo\", image=\"library/bash:4.4.23\", command=[\"sh\", \"-c\"], arguments=['echo \"Result: {}\"'.format(text)]\n",
    "    )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Define Your Pipeline as a Python Function\n",
    "\n",
    "Describe each pipeline as a Python function."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@dsl.pipeline(name=\"Calculate sum pipeline\", description=\"Calculate sum of numbers and prints the result.\")\n",
    "def calculate_sum(a=7, b=10, c=4, d=7):\n",
    "    \"\"\"A four-step pipeline with first two running in parallel.\"\"\"\n",
    "\n",
    "    sum1 = add_two_numbers(a, b)\n",
    "    sum2 = add_two_numbers(c, d)\n",
    "    sum = add_two_numbers(sum1.output, sum2.output)\n",
    "\n",
    "    echo_task = echo_op(sum.output)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Compile the Pipeline\n",
    "\n",
    "Compile the pipeline to generate a compressed YAML definition of the pipeline. The Kubeflow Pipelines service converts the static configuration into a set of Kubernetes resources for execution."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "kfp.compiler.Compiler().compile(calculate_sum, \"calculate-sum-pipeline.zip\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!ls -al ./calculate-sum-pipeline.zip"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!unzip -o ./calculate-sum-pipeline.zip"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pygmentize pipeline.yaml"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Deploy Pipeline\n",
    "\n",
    "There're two ways to deploy the pipeline. Either upload the generate `.tar.gz` file through the `Kubeflow Pipelines UI`, or use `Kubeflow Pipeline SDK` to deploy it.\n",
    "\n",
    "We will only show sdk usage here."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "client = kfp.Client()\n",
    "\n",
    "experiment = client.create_experiment(name=\"kubeflow\")\n",
    "\n",
    "my_run = client.run_pipeline(experiment.id, \"calculate-sum-pipeline\", \"calculate-sum-pipeline.zip\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
